Browse Source

Add JoinGroup request and response pair.

Willem van Bergen 10 years ago
parent
commit
1badb8e8c4
7 changed files with 243 additions and 0 deletions
  1. 94 0
      join_group_request.go
  2. 90 0
      join_group_response.go
  3. 1 0
      packet_decoder.go
  4. 1 0
      packet_encoder.go
  5. 15 0
      prep_encoder.go
  6. 27 0
      real_decoder.go
  7. 15 0
      real_encoder.go

+ 94 - 0
join_group_request.go

@@ -0,0 +1,94 @@
+package sarama
+
+type JoinGroupRequest struct {
+	GroupId        string
+	SessionTimeout int32
+	MemberId       string
+	ProtocolType   string
+	GroupProtocols map[string][]byte
+}
+
+func (r *JoinGroupRequest) encode(pe packetEncoder) error {
+	if err := pe.putString(r.GroupId); err != nil {
+		return err
+	}
+	pe.putInt32(r.SessionTimeout)
+	if err := pe.putString(r.MemberId); err != nil {
+		return err
+	}
+	if err := pe.putString(r.ProtocolType); err != nil {
+		return err
+	}
+
+	if err := pe.putArrayLength(len(r.GroupProtocols)); err != nil {
+		return err
+	}
+	for name, metadata := range r.GroupProtocols {
+		if err := pe.putString(name); err != nil {
+			return err
+		}
+		if err := pe.putBytes(metadata); err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
+func (r *JoinGroupRequest) decode(pd packetDecoder) (err error) {
+	if r.GroupId, err = pd.getString(); err != nil {
+		return
+	}
+
+	if r.SessionTimeout, err = pd.getInt32(); err != nil {
+		return
+	}
+
+	if r.MemberId, err = pd.getString(); err != nil {
+		return
+	}
+
+	if r.ProtocolType, err = pd.getString(); err != nil {
+		return
+	}
+
+	n, err := pd.getArrayLength()
+	if err != nil {
+		return err
+	}
+	if n == 0 {
+		return nil
+	}
+
+	r.GroupProtocols = make(map[string][]byte)
+	for i := 0; i < n; i++ {
+		name, err := pd.getString()
+		if err != nil {
+			return err
+		}
+		metadata, err := pd.getBytes()
+		if err != nil {
+			return err
+		}
+
+		r.GroupProtocols[name] = metadata
+	}
+
+	return nil
+}
+
+func (r *JoinGroupRequest) key() int16 {
+	return 11
+}
+
+func (r *JoinGroupRequest) version() int16 {
+	return 0
+}
+
+func (r *JoinGroupRequest) AddGroupProtocol(name string, metadata []byte) {
+	if r.GroupProtocols == nil {
+		r.GroupProtocols = make(map[string][]byte)
+	}
+
+	r.GroupProtocols[name] = metadata
+}

+ 90 - 0
join_group_response.go

@@ -0,0 +1,90 @@
+package sarama
+
+type JoinGroupResponse struct {
+	Err           KError
+	GenerationId  int32
+	GroupProtocol string
+	LeaderId      string
+	MemberId      string
+	Members       map[string][]byte
+}
+
+func (r *JoinGroupResponse) encode(pe packetEncoder) error {
+	pe.putInt16(int16(r.Err))
+	pe.putInt32(r.GenerationId)
+
+	if err := pe.putString(r.GroupProtocol); err != nil {
+		return err
+	}
+	if err := pe.putString(r.LeaderId); err != nil {
+		return err
+	}
+	if err := pe.putString(r.MemberId); err != nil {
+		return err
+	}
+
+	if err := pe.putArrayLength(len(r.Members)); err != nil {
+		return err
+	}
+
+	for memberId, memberMetadata := range r.Members {
+		if err := pe.putString(memberId); err != nil {
+			return err
+		}
+
+		if err := pe.putBytes(memberMetadata); err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
+func (r *JoinGroupResponse) decode(pd packetDecoder) (err error) {
+	if kerr, err := pd.getInt16(); err != nil {
+		return err
+	} else {
+		r.Err = KError(kerr)
+	}
+
+	if r.GenerationId, err = pd.getInt32(); err != nil {
+		return
+	}
+
+	if r.GroupProtocol, err = pd.getString(); err != nil {
+		return
+	}
+
+	if r.LeaderId, err = pd.getString(); err != nil {
+		return
+	}
+
+	if r.MemberId, err = pd.getString(); err != nil {
+		return
+	}
+
+	n, err := pd.getArrayLength()
+	if err != nil {
+		return err
+	}
+	if n == 0 {
+		return nil
+	}
+
+	r.Members = make(map[string][]byte)
+	for i := 0; i < n; i++ {
+		memberId, err := pd.getString()
+		if err != nil {
+			return err
+		}
+
+		memberMetadata, err := pd.getBytes()
+		if err != nil {
+			return err
+		}
+
+		r.Members[memberId] = memberMetadata
+	}
+
+	return nil
+}

+ 1 - 0
packet_decoder.go

@@ -16,6 +16,7 @@ type packetDecoder interface {
 	getString() (string, error)
 	getInt32Array() ([]int32, error)
 	getInt64Array() ([]int64, error)
+	getStringArray() ([]string, error)
 
 	// Subsets
 	remaining() int

+ 1 - 0
packet_encoder.go

@@ -15,6 +15,7 @@ type packetEncoder interface {
 	putBytes(in []byte) error
 	putRawBytes(in []byte) error
 	putString(in string) error
+	putStringArray(in []string) error
 	putInt32Array(in []int32) error
 	putInt64Array(in []int64) error
 

+ 15 - 0
prep_encoder.go

@@ -66,6 +66,21 @@ func (pe *prepEncoder) putString(in string) error {
 	return nil
 }
 
+func (pe *prepEncoder) putStringArray(in []string) error {
+	err := pe.putArrayLength(len(in))
+	if err != nil {
+		return err
+	}
+
+	for _, str := range in {
+		if err := pe.putString(str); err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
 func (pe *prepEncoder) putInt32Array(in []int32) error {
 	err := pe.putArrayLength(len(in))
 	if err != nil {

+ 27 - 0
real_decoder.go

@@ -181,6 +181,33 @@ func (rd *realDecoder) getInt64Array() ([]int64, error) {
 	return ret, nil
 }
 
+func (rd *realDecoder) getStringArray() ([]string, error) {
+	if rd.remaining() < 4 {
+		rd.off = len(rd.raw)
+		return nil, ErrInsufficientData
+	}
+	n := int(binary.BigEndian.Uint32(rd.raw[rd.off:]))
+	rd.off += 4
+
+	if n == 0 {
+		return nil, nil
+	}
+
+	if n < 0 {
+		return nil, PacketDecodingError{"invalid array length"}
+	}
+
+	ret := make([]string, n)
+	for i := range ret {
+		if str, err := rd.getString(); err != nil {
+			return nil, err
+		} else {
+			ret[i] = str
+		}
+	}
+	return ret, nil
+}
+
 // subsets
 
 func (rd *realDecoder) remaining() int {

+ 15 - 0
real_encoder.go

@@ -61,6 +61,21 @@ func (re *realEncoder) putString(in string) error {
 	return nil
 }
 
+func (re *realEncoder) putStringArray(in []string) error {
+	err := re.putArrayLength(len(in))
+	if err != nil {
+		return err
+	}
+
+	for _, val := range in {
+		if err := re.putString(val); err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
 func (re *realEncoder) putInt32Array(in []int32) error {
 	err := re.putArrayLength(len(in))
 	if err != nil {