fetch_request.go 1.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
  1. package kafka
  2. type fetchRequestBlock struct {
  3. fetchOffset int64
  4. maxBytes int32
  5. }
  6. func (f *fetchRequestBlock) encode(pe packetEncoder) {
  7. pe.putInt64(f.fetchOffset)
  8. pe.putInt32(f.maxBytes)
  9. }
  10. type FetchRequest struct {
  11. MaxWaitTime int32
  12. MinBytes int32
  13. blocks map[*string]map[int32]*fetchRequestBlock
  14. }
  15. func (f *FetchRequest) encode(pe packetEncoder) {
  16. pe.putInt32(-1) // replica ID is always -1 for clients
  17. pe.putInt32(f.MaxWaitTime)
  18. pe.putInt32(f.MinBytes)
  19. pe.putArrayCount(len(f.blocks))
  20. for topic, blocks := range f.blocks {
  21. pe.putString(topic)
  22. pe.putArrayCount(len(blocks))
  23. for partition, block := range blocks {
  24. pe.putInt32(partition)
  25. block.encode(pe)
  26. }
  27. }
  28. }
  29. func (f *FetchRequest) key() int16 {
  30. return 1
  31. }
  32. func (f *FetchRequest) version() int16 {
  33. return 0
  34. }
  35. func (f *FetchRequest) AddBlock(topic *string, partition_id int32, fetchOffset int64, maxBytes int32) {
  36. if f.blocks == nil {
  37. f.blocks = make(map[*string]map[int32]*fetchRequestBlock)
  38. }
  39. if f.blocks[topic] == nil {
  40. f.blocks[topic] = make(map[int32]*fetchRequestBlock)
  41. }
  42. tmp := new(fetchRequestBlock)
  43. tmp.maxBytes = maxBytes
  44. tmp.fetchOffset = fetchOffset
  45. f.blocks[topic][partition_id] = tmp
  46. }