|
|
@@ -1,11 +1,11 @@
|
|
|
package kafka
|
|
|
|
|
|
-type fetchRequestPartition struct {
|
|
|
+type fetchRequestBlock struct {
|
|
|
fetchOffset int64
|
|
|
maxBytes int32
|
|
|
}
|
|
|
|
|
|
-func (f *fetchRequestPartition) encode(pe packetEncoder) {
|
|
|
+func (f *fetchRequestBlock) encode(pe packetEncoder) {
|
|
|
pe.putInt64(f.fetchOffset)
|
|
|
pe.putInt32(f.maxBytes)
|
|
|
}
|
|
|
@@ -13,18 +13,18 @@ func (f *fetchRequestPartition) encode(pe packetEncoder) {
|
|
|
type FetchRequest struct {
|
|
|
MaxWaitTime int32
|
|
|
MinBytes int32
|
|
|
- partitions map[*string]map[int32]*fetchRequestPartition
|
|
|
+ blocks map[*string]map[int32]*fetchRequestBlock
|
|
|
}
|
|
|
|
|
|
func (f *FetchRequest) encode(pe packetEncoder) {
|
|
|
pe.putInt32(-1) // replica ID is always -1 for clients
|
|
|
pe.putInt32(f.MaxWaitTime)
|
|
|
pe.putInt32(f.MinBytes)
|
|
|
- pe.putArrayCount(len(f.partitions))
|
|
|
- for topic, partitions := range f.partitions {
|
|
|
+ pe.putArrayCount(len(f.blocks))
|
|
|
+ for topic, blocks := range f.blocks {
|
|
|
pe.putString(topic)
|
|
|
- pe.putArrayCount(len(partitions))
|
|
|
- for partition, block := range partitions {
|
|
|
+ pe.putArrayCount(len(blocks))
|
|
|
+ for partition, block := range blocks {
|
|
|
pe.putInt32(partition)
|
|
|
block.encode(pe)
|
|
|
}
|
|
|
@@ -39,18 +39,18 @@ func (f *FetchRequest) version() int16 {
|
|
|
return 0
|
|
|
}
|
|
|
|
|
|
-func (f *FetchRequest) AddPartition(topic *string, partition_id int32, fetchOffset int64, maxBytes int32) {
|
|
|
- if f.partitions == nil {
|
|
|
- f.partitions = make(map[*string]map[int32]*fetchRequestPartition)
|
|
|
+func (f *FetchRequest) AddBlock(topic *string, partition_id int32, fetchOffset int64, maxBytes int32) {
|
|
|
+ if f.blocks == nil {
|
|
|
+ f.blocks = make(map[*string]map[int32]*fetchRequestBlock)
|
|
|
}
|
|
|
|
|
|
- if f.partitions[topic] == nil {
|
|
|
- f.partitions[topic] = make(map[int32]*fetchRequestPartition)
|
|
|
+ if f.blocks[topic] == nil {
|
|
|
+ f.blocks[topic] = make(map[int32]*fetchRequestBlock)
|
|
|
}
|
|
|
|
|
|
- tmp := new(fetchRequestPartition)
|
|
|
+ tmp := new(fetchRequestBlock)
|
|
|
tmp.maxBytes = maxBytes
|
|
|
tmp.fetchOffset = fetchOffset
|
|
|
|
|
|
- f.partitions[topic][partition_id] = tmp
|
|
|
+ f.blocks[topic][partition_id] = tmp
|
|
|
}
|