| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556 |
- package kafka
- type fetchRequestPartition struct {
- fetchOffset int64
- maxBytes int32
- }
- func (f *fetchRequestPartition) encode(pe packetEncoder) {
- pe.putInt64(f.fetchOffset)
- pe.putInt32(f.maxBytes)
- }
- type FetchRequest struct {
- MaxWaitTime int32
- MinBytes int32
- partitions map[*string]map[int32]*fetchRequestPartition
- }
- func (f *FetchRequest) encode(pe packetEncoder) {
- pe.putInt32(-1) // replica ID is always -1 for clients
- pe.putInt32(f.MaxWaitTime)
- pe.putInt32(f.MinBytes)
- pe.putArrayCount(len(f.partitions))
- for topic, partitions := range f.partitions {
- pe.putString(topic)
- pe.putArrayCount(len(partitions))
- for partition, block := range partitions {
- pe.putInt32(partition)
- block.encode(pe)
- }
- }
- }
- func (f *FetchRequest) key() int16 {
- return 1
- }
- func (f *FetchRequest) version() int16 {
- return 0
- }
- func (f *FetchRequest) AddPartition(topic *string, partition_id int32, fetchOffset int64, maxBytes int32) {
- if f.partitions == nil {
- f.partitions = make(map[*string]map[int32]*fetchRequestPartition)
- }
- if f.partitions[topic] == nil {
- f.partitions[topic] = make(map[int32]*fetchRequestPartition)
- }
- tmp := new(fetchRequestPartition)
- tmp.maxBytes = maxBytes
- tmp.fetchOffset = fetchOffset
- f.partitions[topic][partition_id] = tmp
- }
|