Browse Source

Implement fetch requests

Evan Huus 12 years ago
parent
commit
19099f5e88
1 changed files with 53 additions and 0 deletions
  1. 53 0
      fetch_request.go

+ 53 - 0
fetch_request.go

@@ -1 +1,54 @@
 package kafka
+
+type fetchRequestPartitionBlock struct {
+	partition   int32
+	fetchOffset int64
+	maxBytes    int32
+}
+
+func (p *fetchRequestPartitionBlock) encode(pe packetEncoder) {
+	pe.putInt32(p.partition)
+	pe.putInt64(p.fetchOffset)
+	pe.putInt32(p.maxBytes)
+}
+
+type fetchRequestTopicBlock struct {
+	topic      *string
+	partitions []fetchRequestPartitionBlock
+}
+
+func (p *fetchRequestTopicBlock) encode(pe packetEncoder) {
+	pe.putString(p.topic)
+	pe.putArrayCount(len(p.partitions))
+	for i := range p.partitions {
+		(&p.partitions[i]).encode(pe)
+	}
+}
+
+type fetchRequest struct {
+	maxWaitTime int32
+	minBytes    int32
+	topics      []fetchRequestTopicBlock
+}
+
+func (p *fetchRequest) encode(pe packetEncoder) {
+	pe.putInt32(-1) // replica ID is always -1 for clients
+	pe.putInt32(p.maxWaitTime)
+	pe.putInt32(p.minBytes)
+	pe.putArrayCount(len(p.topics))
+	for i := range p.topics {
+		(&p.topics[i]).encode(pe)
+	}
+}
+
+func (p *fetchRequest) key() int16 {
+	return 1
+}
+
+func (p *fetchRequest) version() int16 {
+	return 0
+}
+
+func (p *fetchRequest) expectResponse() bool {
+	return true
+}