Browse Source

Merge pull request #588 from dim/kafka-09-consumer-groups

Consumer groups on Kafka 0.9
Evan Huus 10 years ago
parent
commit
66d77e132f

+ 44 - 0
broker.go

@@ -240,6 +240,50 @@ func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse,
 	return response, nil
 	return response, nil
 }
 }
 
 
+func (b *Broker) JoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error) {
+	response := new(JoinGroupResponse)
+
+	err := b.sendAndReceive(request, response)
+	if err != nil {
+		return nil, err
+	}
+
+	return response, nil
+}
+
+func (b *Broker) SyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error) {
+	response := new(SyncGroupResponse)
+
+	err := b.sendAndReceive(request, response)
+	if err != nil {
+		return nil, err
+	}
+
+	return response, nil
+}
+
+func (b *Broker) LeaveGroup(request *LeaveGroupRequest) (*LeaveGroupResponse, error) {
+	response := new(LeaveGroupResponse)
+
+	err := b.sendAndReceive(request, response)
+	if err != nil {
+		return nil, err
+	}
+
+	return response, nil
+}
+
+func (b *Broker) Heartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error) {
+	response := new(HeartbeatResponse)
+
+	err := b.sendAndReceive(request, response)
+	if err != nil {
+		return nil, err
+	}
+
+	return response, nil
+}
+
 func (b *Broker) send(rb requestBody, promiseResponse bool) (*responsePromise, error) {
 func (b *Broker) send(rb requestBody, promiseResponse bool) (*responsePromise, error) {
 	b.lock.Lock()
 	b.lock.Lock()
 	defer b.lock.Unlock()
 	defer b.lock.Unlock()

+ 48 - 0
broker_test.go

@@ -176,4 +176,52 @@ var brokerTestTable = []struct {
 				t.Error("Offset request got no response!")
 				t.Error("Offset request got no response!")
 			}
 			}
 		}},
 		}},
+
+	{[]byte{0x00, 0x17, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
+		func(t *testing.T, broker *Broker) {
+			request := JoinGroupRequest{}
+			response, err := broker.JoinGroup(&request)
+			if err != nil {
+				t.Error(err)
+			}
+			if response == nil {
+				t.Error("JoinGroup request got no response!")
+			}
+		}},
+
+	{[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
+		func(t *testing.T, broker *Broker) {
+			request := SyncGroupRequest{}
+			response, err := broker.SyncGroup(&request)
+			if err != nil {
+				t.Error(err)
+			}
+			if response == nil {
+				t.Error("SyncGroup request got no response!")
+			}
+		}},
+
+	{[]byte{0x00, 0x00},
+		func(t *testing.T, broker *Broker) {
+			request := LeaveGroupRequest{}
+			response, err := broker.LeaveGroup(&request)
+			if err != nil {
+				t.Error(err)
+			}
+			if response == nil {
+				t.Error("LeaveGroup request got no response!")
+			}
+		}},
+
+	{[]byte{0x00, 0x00},
+		func(t *testing.T, broker *Broker) {
+			request := HeartbeatRequest{}
+			response, err := broker.Heartbeat(&request)
+			if err != nil {
+				t.Error(err)
+			}
+			if response == nil {
+				t.Error("Heartbeat request got no response!")
+			}
+		}},
 }
 }

+ 94 - 0
consumer_group_members.go

@@ -0,0 +1,94 @@
+package sarama
+
+type ConsumerGroupMemberMetadata struct {
+	Version  int16
+	Topics   []string
+	UserData []byte
+}
+
+func (m *ConsumerGroupMemberMetadata) encode(pe packetEncoder) error {
+	pe.putInt16(m.Version)
+
+	if err := pe.putStringArray(m.Topics); err != nil {
+		return err
+	}
+
+	if err := pe.putBytes(m.UserData); err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func (m *ConsumerGroupMemberMetadata) decode(pd packetDecoder) (err error) {
+	if m.Version, err = pd.getInt16(); err != nil {
+		return
+	}
+
+	if m.Topics, err = pd.getStringArray(); err != nil {
+		return
+	}
+
+	if m.UserData, err = pd.getBytes(); err != nil {
+		return
+	}
+
+	return nil
+}
+
+type ConsumerGroupMemberAssignment struct {
+	Version  int16
+	Topics   map[string][]int32
+	UserData []byte
+}
+
+func (m *ConsumerGroupMemberAssignment) encode(pe packetEncoder) error {
+	pe.putInt16(m.Version)
+
+	if err := pe.putArrayLength(len(m.Topics)); err != nil {
+		return err
+	}
+
+	for topic, partitions := range m.Topics {
+		if err := pe.putString(topic); err != nil {
+			return err
+		}
+		if err := pe.putInt32Array(partitions); err != nil {
+			return err
+		}
+	}
+
+	if err := pe.putBytes(m.UserData); err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func (m *ConsumerGroupMemberAssignment) decode(pd packetDecoder) (err error) {
+	if m.Version, err = pd.getInt16(); err != nil {
+		return
+	}
+
+	var topicLen int
+	if topicLen, err = pd.getArrayLength(); err != nil {
+		return
+	}
+
+	m.Topics = make(map[string][]int32, topicLen)
+	for i := 0; i < topicLen; i++ {
+		var topic string
+		if topic, err = pd.getString(); err != nil {
+			return
+		}
+		if m.Topics[topic], err = pd.getInt32Array(); err != nil {
+			return
+		}
+	}
+
+	if m.UserData, err = pd.getBytes(); err != nil {
+		return
+	}
+
+	return nil
+}

+ 77 - 0
consumer_group_members_test.go

@@ -0,0 +1,77 @@
+package sarama
+
+import (
+	"bytes"
+	"reflect"
+	"testing"
+)
+
+var (
+	groupMemberMetadata = []byte{
+		0, 1, // Version
+		0, 0, 0, 2, // Topic array length
+		0, 3, 'o', 'n', 'e', // Topic one
+		0, 3, 't', 'w', 'o', // Topic two
+		0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata
+	}
+	groupMemberAssignment = []byte{
+		0, 1, // Version
+		0, 0, 0, 2, // Topic array length
+		0, 3, 'o', 'n', 'e', // Topic one
+		0, 0, 0, 3, // Topic one, partition array length
+		0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 4, // 0, 2, 4
+		0, 3, 't', 'w', 'o', // Topic two
+		0, 0, 0, 2, // Topic two, partition array length
+		0, 0, 0, 1, 0, 0, 0, 3, // 1, 3
+		0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata
+	}
+)
+
+func TestConsumerGroupMemberMetadata(t *testing.T) {
+	meta := &ConsumerGroupMemberMetadata{
+		Version:  1,
+		Topics:   []string{"one", "two"},
+		UserData: []byte{0x01, 0x02, 0x03},
+	}
+
+	buf, err := encode(meta)
+	if err != nil {
+		t.Error("Failed to encode data", err)
+	} else if !bytes.Equal(groupMemberMetadata, buf) {
+		t.Errorf("Encoded data does not match expectation\nexpected: %v\nactual: %v", groupMemberMetadata, buf)
+	}
+
+	meta2 := new(ConsumerGroupMemberMetadata)
+	err = decode(buf, meta2)
+	if err != nil {
+		t.Error("Failed to decode data", err)
+	} else if !reflect.DeepEqual(meta, meta2) {
+		t.Errorf("Encoded data does not match expectation\nexpected: %v\nactual: %v", meta, meta2)
+	}
+}
+
+func TestConsumerGroupMemberAssignment(t *testing.T) {
+	amt := &ConsumerGroupMemberAssignment{
+		Version: 1,
+		Topics: map[string][]int32{
+			"one": []int32{0, 2, 4},
+			"two": []int32{1, 3},
+		},
+		UserData: []byte{0x01, 0x02, 0x03},
+	}
+
+	buf, err := encode(amt)
+	if err != nil {
+		t.Error("Failed to encode data", err)
+	} else if !bytes.Equal(groupMemberAssignment, buf) {
+		t.Errorf("Encoded data does not match expectation\nexpected: %v\nactual: %v", groupMemberAssignment, buf)
+	}
+
+	amt2 := new(ConsumerGroupMemberAssignment)
+	err = decode(buf, amt2)
+	if err != nil {
+		t.Error("Failed to decode data", err)
+	} else if !reflect.DeepEqual(amt, amt2) {
+		t.Errorf("Encoded data does not match expectation\nexpected: %v\nactual: %v", amt, amt2)
+	}
+}

+ 10 - 0
join_group_request.go

@@ -92,3 +92,13 @@ func (r *JoinGroupRequest) AddGroupProtocol(name string, metadata []byte) {
 
 
 	r.GroupProtocols[name] = metadata
 	r.GroupProtocols[name] = metadata
 }
 }
+
+func (r *JoinGroupRequest) AddGroupProtocolMetadata(name string, metadata *ConsumerGroupMemberMetadata) error {
+	bin, err := encode(metadata)
+	if err != nil {
+		return err
+	}
+
+	r.AddGroupProtocol(name, bin)
+	return nil
+}

+ 12 - 0
join_group_response.go

@@ -9,6 +9,18 @@ type JoinGroupResponse struct {
 	Members       map[string][]byte
 	Members       map[string][]byte
 }
 }
 
 
+func (r *JoinGroupResponse) GetMembers() (map[string]ConsumerGroupMemberMetadata, error) {
+	members := make(map[string]ConsumerGroupMemberMetadata, len(r.Members))
+	for id, bin := range r.Members {
+		meta := new(ConsumerGroupMemberMetadata)
+		if err := decode(bin, meta); err != nil {
+			return nil, err
+		}
+		members[id] = *meta
+	}
+	return members, nil
+}
+
 func (r *JoinGroupResponse) encode(pe packetEncoder) error {
 func (r *JoinGroupResponse) encode(pe packetEncoder) error {
 	pe.putInt16(int16(r.Err))
 	pe.putInt16(int16(r.Err))
 	pe.putInt32(r.GenerationId)
 	pe.putInt32(r.GenerationId)

+ 10 - 0
sync_group_request.go

@@ -84,3 +84,13 @@ func (r *SyncGroupRequest) AddGroupAssignment(memberId string, memberAssignment
 
 
 	r.GroupAssignments[memberId] = memberAssignment
 	r.GroupAssignments[memberId] = memberAssignment
 }
 }
+
+func (r *SyncGroupRequest) AddGroupAssignmentMember(memberId string, memberAssignment *ConsumerGroupMemberAssignment) error {
+	bin, err := encode(memberAssignment)
+	if err != nil {
+		return err
+	}
+
+	r.AddGroupAssignment(memberId, bin)
+	return nil
+}

+ 6 - 0
sync_group_response.go

@@ -5,6 +5,12 @@ type SyncGroupResponse struct {
 	MemberAssignment []byte
 	MemberAssignment []byte
 }
 }
 
 
+func (r *SyncGroupResponse) GetMemberAssignment() (*ConsumerGroupMemberAssignment, error) {
+	assignment := new(ConsumerGroupMemberAssignment)
+	err := decode(r.MemberAssignment, assignment)
+	return assignment, err
+}
+
 func (r *SyncGroupResponse) encode(pe packetEncoder) error {
 func (r *SyncGroupResponse) encode(pe packetEncoder) error {
 	pe.putInt16(int16(r.Err))
 	pe.putInt16(int16(r.Err))
 	return pe.putBytes(r.MemberAssignment)
 	return pe.putBytes(r.MemberAssignment)