瀏覽代碼

Decode remaining parts of fetch responses

Evan Huus 12 年之前
父節點
當前提交
1862bd8823
共有 4 個文件被更改,包括 22 次插入2 次删除
  1. 12 1
      fetch_response.go
  2. 1 1
      message_set.go
  3. 1 0
      packet_decoder.go
  4. 8 0
      real_decoder.go

+ 12 - 1
fetch_response.go

@@ -23,7 +23,18 @@ func (pr *fetchResponsePartitionBlock) decode(pd packetDecoder) (err error) {
 		return err
 	}
 
-	return nil
+	msgSetSize, err := pd.getInt32()
+	if err != nil {
+		return err
+	}
+
+	msgSetDecoder, err := pd.getSubset(int(msgSetSize))
+	if err != nil {
+		return err
+	}
+	err = (&pr.msgSet).decode(msgSetDecoder)
+
+	return err
 }
 
 type fetchResponseTopicBlock struct {

+ 1 - 1
message_set.go

@@ -47,7 +47,7 @@ func (ms *messageSet) encode(pe packetEncoder) {
 }
 
 func (ms *messageSet) decode(pd packetDecoder) (err error) {
-	ms.msgs = make([]*messageSetBlock, 0)
+	ms.msgs = nil
 
 	for pd.remaining() > 0 {
 		msb := new(messageSetBlock)

+ 1 - 0
packet_decoder.go

@@ -17,6 +17,7 @@ type packetDecoder interface {
 	getError() (KError, error)
 	getString() (*string, error)
 	getBytes() (*[]byte, error)
+	getSubset(length int) (packetDecoder, error)
 
 	// stackable
 	push(in pushDecoder) error

+ 8 - 0
real_decoder.go

@@ -146,6 +146,14 @@ func (rd *realDecoder) getBytes() (*[]byte, error) {
 	}
 }
 
+func (rd *realDecoder) getSubset(length int) (packetDecoder, error) {
+	if rd.remaining() < length {
+		return nil, DecodingError{"Not enough data for subset."}
+	}
+
+	return &realDecoder{raw: rd.raw[rd.off : rd.off+length]}, nil
+}
+
 // stackable
 
 func (rd *realDecoder) push(in pushDecoder) error {