fetch_request.go 1014 B

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950
  1. package kafka
  2. type fetchRequestPartitionBlock struct {
  3. partition int32
  4. fetchOffset int64
  5. maxBytes int32
  6. }
  7. func (p *fetchRequestPartitionBlock) encode(pe packetEncoder) {
  8. pe.putInt32(p.partition)
  9. pe.putInt64(p.fetchOffset)
  10. pe.putInt32(p.maxBytes)
  11. }
  12. type fetchRequestTopicBlock struct {
  13. topic *string
  14. partitions []fetchRequestPartitionBlock
  15. }
  16. func (p *fetchRequestTopicBlock) encode(pe packetEncoder) {
  17. pe.putString(p.topic)
  18. pe.putArrayCount(len(p.partitions))
  19. for i := range p.partitions {
  20. (&p.partitions[i]).encode(pe)
  21. }
  22. }
  23. type fetchRequest struct {
  24. maxWaitTime int32
  25. minBytes int32
  26. topics []fetchRequestTopicBlock
  27. }
  28. func (p *fetchRequest) encode(pe packetEncoder) {
  29. pe.putInt32(-1) // replica ID is always -1 for clients
  30. pe.putInt32(p.maxWaitTime)
  31. pe.putInt32(p.minBytes)
  32. pe.putArrayCount(len(p.topics))
  33. for i := range p.topics {
  34. (&p.topics[i]).encode(pe)
  35. }
  36. }
  37. func (p *fetchRequest) key() int16 {
  38. return 1
  39. }
  40. func (p *fetchRequest) version() int16 {
  41. return 0
  42. }