浏览代码

OffsetCommit request/response

Evan Huus 12 年之前
父节点
当前提交
c5f7eb4e02
共有 2 个文件被更改,包括 93 次插入0 次删除
  1. 52 0
      offset_commit_request.go
  2. 41 0
      offset_commit_response.go

+ 52 - 0
offset_commit_request.go

@@ -1 +1,53 @@
 package kafka
+
+type offsetCommitRequestBlock struct {
+	offset   int64
+	metadata *string
+}
+
+func (r *offsetCommitRequestBlock) encode(pe packetEncoder) {
+	pe.putInt64(r.offset)
+	pe.putString(r.metadata)
+}
+
+type OffsetCommitRequest struct {
+	ConsumerGroup *string
+	blocks        map[*string]map[int32]*offsetCommitRequestBlock
+}
+
+func (r *OffsetCommitRequest) encode(pe packetEncoder) {
+	pe.putString(r.ConsumerGroup)
+	pe.putArrayCount(len(r.blocks))
+	for topic, partitions := range r.blocks {
+		pe.putString(topic)
+		pe.putArrayCount(len(partitions))
+		for partition, block := range partitions {
+			pe.putInt32(partition)
+			block.encode(pe)
+		}
+	}
+}
+
+func (r *OffsetCommitRequest) key() int16 {
+	return 6
+}
+
+func (r *OffsetCommitRequest) version() int16 {
+	return 0
+}
+
+func (r *OffsetCommitRequest) AddBlock(topic *string, partition_id int32, offset int64, metadata *string) {
+	if r.blocks == nil {
+		r.blocks = make(map[*string]map[int32]*offsetCommitRequestBlock)
+	}
+
+	if r.blocks[topic] == nil {
+		r.blocks[topic] = make(map[int32]*offsetCommitRequestBlock)
+	}
+
+	tmp := new(offsetCommitRequestBlock)
+	tmp.offset = offset
+	tmp.metadata = metadata
+
+	r.blocks[topic][partition_id] = tmp
+}

+ 41 - 0
offset_commit_response.go

@@ -1 +1,42 @@
 package kafka
+
+type OffsetCommitResponse struct {
+	Errors map[*string]map[int32]KError
+}
+
+func (r *OffsetCommitResponse) decode(pd packetDecoder) (err error) {
+	numTopics, err := pd.getArrayCount()
+	if err != nil {
+		return err
+	}
+
+	r.Errors = make(map[*string]map[int32]KError, numTopics)
+	for i := 0; i < numTopics; i++ {
+		name, err := pd.getString()
+		if err != nil {
+			return err
+		}
+
+		numErrors, err := pd.getArrayCount()
+		if err != nil {
+			return err
+		}
+
+		r.Errors[name] = make(map[int32]KError, numErrors)
+
+		for j := 0; j < numErrors; j++ {
+			id, err := pd.getInt32()
+			if err != nil {
+				return err
+			}
+
+			tmp, err := pd.getError()
+			if err != nil {
+				return err
+			}
+			r.Errors[name][id] = tmp
+		}
+	}
+
+	return nil
+}