Browse Source

Decode produce response

Evan Huus 12 years ago
parent
commit
b910c80692
2 changed files with 74 additions and 8 deletions
  1. 8 8
      produce_request.go
  2. 66 0
      produce_response.go

+ 8 - 8
produce_request.go

@@ -40,6 +40,14 @@ func (p *produceRequest) encode(pe packetEncoder) {
 	}
 	}
 }
 }
 
 
+func (p *produceRequest) key() int16 {
+	return 0
+}
+
+func (p *produceRequest) version() int16 {
+	return 0
+}
+
 func newSingletonProduceRequest(topic string, partition int32, set *messageSet) *produceRequest {
 func newSingletonProduceRequest(topic string, partition int32, set *messageSet) *produceRequest {
 	req := &produceRequest{topics: make([]produceRequestTopicBlock, 1)}
 	req := &produceRequest{topics: make([]produceRequestTopicBlock, 1)}
 	req.topics[0].topic = &topic
 	req.topics[0].topic = &topic
@@ -48,11 +56,3 @@ func newSingletonProduceRequest(topic string, partition int32, set *messageSet)
 	req.topics[0].partitions[0].msgSet = set
 	req.topics[0].partitions[0].msgSet = set
 	return req
 	return req
 }
 }
-
-func (p *produceRequest) key() int16 {
-	return 0
-}
-
-func (p *produceRequest) version() int16 {
-	return 0
-}

+ 66 - 0
produce_response.go

@@ -1,8 +1,74 @@
 package kafka
 package kafka
 
 
+type produceResponsePartitionBlock struct {
+	id     int32
+	err    KError
+	offset int64
+}
+
+func (pr *produceResponsePartitionBlock) decode(pd packetDecoder) (err error) {
+	pr.id, err = pd.getInt32()
+	if err != nil {
+		return err
+	}
+
+	pr.err, err = pd.getError()
+	if err != nil {
+		return err
+	}
+
+	pr.offset, err = pd.getInt64()
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+type produceResponseTopicBlock struct {
+	name       *string
+	partitions []produceResponsePartitionBlock
+}
+
+func (pr *produceResponseTopicBlock) decode(pd packetDecoder) (err error) {
+	pr.name, err = pd.getString()
+	if err != nil {
+		return err
+	}
+
+	n, err := pd.getArrayCount()
+	if err != nil {
+		return err
+	}
+
+	pr.partitions = make([]produceResponsePartitionBlock, n)
+	for i := range pr.partitions {
+		err = (&pr.partitions[i]).decode(pd)
+		if err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
 type produceResponse struct {
 type produceResponse struct {
+	topics []produceResponseTopicBlock
 }
 }
 
 
 func (pr *produceResponse) decode(pd packetDecoder) (err error) {
 func (pr *produceResponse) decode(pd packetDecoder) (err error) {
+	n, err := pd.getArrayCount()
+	if err != nil {
+		return err
+	}
+
+	pr.topics = make([]produceResponseTopicBlock, n)
+	for i := range pr.topics {
+		err = (&pr.topics[i]).decode(pd)
+		if err != nil {
+			return err
+		}
+	}
+
 	return nil
 	return nil
 }
 }