Browse Source

OffsetFetch request/response

Evan Huus 11 years ago
parent
commit
9f01db622b
2 changed files with 94 additions and 0 deletions
  1. 30 0
      offset_fetch_request.go
  2. 64 0
      offset_fetch_response.go

+ 30 - 0
offset_fetch_request.go

@@ -1 +1,31 @@
 package kafka
+
+type OffsetFetchRequest struct {
+	ConsumerGroup *string
+	partitions    map[*string][]int32
+}
+
+func (r *OffsetFetchRequest) encode(pe packetEncoder) {
+	pe.putString(r.ConsumerGroup)
+	pe.putArrayCount(len(r.partitions))
+	for topic, partitions := range r.partitions {
+		pe.putString(topic)
+		pe.putInt32Array(partitions)
+	}
+}
+
+func (r *OffsetFetchRequest) key() int16 {
+	return 7
+}
+
+func (r *OffsetFetchRequest) version() int16 {
+	return 0
+}
+
+func (r *OffsetFetchRequest) AddPartition(topic *string, partition_id int32) {
+	if r.partitions == nil {
+		r.partitions = make(map[*string][]int32)
+	}
+
+	r.partitions[topic] = append(r.partitions[topic], partition_id)
+}

+ 64 - 0
offset_fetch_response.go

@@ -1 +1,65 @@
 package kafka
+
+type OffsetFetchResponseBlock struct {
+	Offset   int64
+	Metadata *string
+	Err      KError
+}
+
+func (r *OffsetFetchResponseBlock) decode(pd packetDecoder) (err error) {
+	r.Offset, err = pd.getInt64()
+	if err != nil {
+		return err
+	}
+
+	r.Metadata, err = pd.getString()
+	if err != nil {
+		return err
+	}
+
+	r.Err, err = pd.getError()
+
+	return err
+}
+
+type OffsetFetchResponse struct {
+	Blocks map[*string]map[int32]*OffsetFetchResponseBlock
+}
+
+func (r *OffsetFetchResponse) decode(pd packetDecoder) (err error) {
+	numTopics, err := pd.getArrayCount()
+	if err != nil {
+		return err
+	}
+
+	r.Blocks = make(map[*string]map[int32]*OffsetFetchResponseBlock, numTopics)
+	for i := 0; i < numTopics; i++ {
+		name, err := pd.getString()
+		if err != nil {
+			return err
+		}
+
+		numBlocks, err := pd.getArrayCount()
+		if err != nil {
+			return err
+		}
+
+		r.Blocks[name] = make(map[int32]*OffsetFetchResponseBlock, numBlocks)
+
+		for j := 0; j < numBlocks; j++ {
+			id, err := pd.getInt32()
+			if err != nil {
+				return err
+			}
+
+			block := new(OffsetFetchResponseBlock)
+			err = block.decode(pd)
+			if err != nil {
+				return err
+			}
+			r.Blocks[name][id] = block
+		}
+	}
+
+	return nil
+}