fetch_request.go 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. package protocol
  2. import enc "sarama/encoding"
  3. type fetchRequestBlock struct {
  4. fetchOffset int64
  5. maxBytes int32
  6. }
  7. func (f *fetchRequestBlock) Encode(pe enc.PacketEncoder) error {
  8. pe.PutInt64(f.fetchOffset)
  9. pe.PutInt32(f.maxBytes)
  10. return nil
  11. }
  12. type FetchRequest struct {
  13. MaxWaitTime int32
  14. MinBytes int32
  15. blocks map[string]map[int32]*fetchRequestBlock
  16. }
  17. func (f *FetchRequest) Encode(pe enc.PacketEncoder) (err error) {
  18. pe.PutInt32(-1) // replica ID is always -1 for clients
  19. pe.PutInt32(f.MaxWaitTime)
  20. pe.PutInt32(f.MinBytes)
  21. err = pe.PutArrayLength(len(f.blocks))
  22. if err != nil {
  23. return err
  24. }
  25. for topic, blocks := range f.blocks {
  26. err = pe.PutString(topic)
  27. if err != nil {
  28. return err
  29. }
  30. err = pe.PutArrayLength(len(blocks))
  31. if err != nil {
  32. return err
  33. }
  34. for partition, block := range blocks {
  35. pe.PutInt32(partition)
  36. err = block.Encode(pe)
  37. if err != nil {
  38. return err
  39. }
  40. }
  41. }
  42. return nil
  43. }
  44. func (f *FetchRequest) key() int16 {
  45. return 1
  46. }
  47. func (f *FetchRequest) version() int16 {
  48. return 0
  49. }
  50. func (f *FetchRequest) AddBlock(topic string, partition_id int32, fetchOffset int64, maxBytes int32) {
  51. if f.blocks == nil {
  52. f.blocks = make(map[string]map[int32]*fetchRequestBlock)
  53. }
  54. if f.blocks[topic] == nil {
  55. f.blocks[topic] = make(map[int32]*fetchRequestBlock)
  56. }
  57. tmp := new(fetchRequestBlock)
  58. tmp.maxBytes = maxBytes
  59. tmp.fetchOffset = fetchOffset
  60. f.blocks[topic][partition_id] = tmp
  61. }