fetch_request.go 1.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
  1. package kafka
  2. type fetchRequestPartition struct {
  3. fetchOffset int64
  4. maxBytes int32
  5. }
  6. func (f *fetchRequestPartition) encode(pe packetEncoder) {
  7. pe.putInt64(f.fetchOffset)
  8. pe.putInt32(f.maxBytes)
  9. }
  10. type FetchRequest struct {
  11. MaxWaitTime int32
  12. MinBytes int32
  13. partitions map[*string]map[int32]*fetchRequestPartition
  14. }
  15. func (f *FetchRequest) encode(pe packetEncoder) {
  16. pe.putInt32(-1) // replica ID is always -1 for clients
  17. pe.putInt32(f.MaxWaitTime)
  18. pe.putInt32(f.MinBytes)
  19. pe.putArrayCount(len(f.partitions))
  20. for topic, partitions := range f.partitions {
  21. pe.putString(topic)
  22. pe.putArrayCount(len(partitions))
  23. for partition, block := range partitions {
  24. pe.putInt32(partition)
  25. block.encode(pe)
  26. }
  27. }
  28. }
  29. func (f *FetchRequest) key() int16 {
  30. return 1
  31. }
  32. func (f *FetchRequest) version() int16 {
  33. return 0
  34. }
  35. func (f *FetchRequest) AddPartition(topic *string, partition_id int32, fetchOffset int64, maxBytes int32) {
  36. if f.partitions == nil {
  37. f.partitions = make(map[*string]map[int32]*fetchRequestPartition)
  38. }
  39. if f.partitions[topic] == nil {
  40. f.partitions[topic] = make(map[int32]*fetchRequestPartition)
  41. }
  42. tmp := new(fetchRequestPartition)
  43. tmp.maxBytes = maxBytes
  44. tmp.fetchOffset = fetchOffset
  45. f.partitions[topic][partition_id] = tmp
  46. }