offset_request.go 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. package protocol
  2. import enc "sarama/encoding"
  3. import "sarama/types"
  4. type offsetRequestBlock struct {
  5. time types.OffsetTime
  6. maxOffsets int32
  7. }
  8. func (r *offsetRequestBlock) Encode(pe enc.PacketEncoder) error {
  9. pe.PutInt64(int64(r.time))
  10. pe.PutInt32(r.maxOffsets)
  11. return nil
  12. }
  13. type OffsetRequest struct {
  14. blocks map[string]map[int32]*offsetRequestBlock
  15. }
  16. func (r *OffsetRequest) Encode(pe enc.PacketEncoder) error {
  17. pe.PutInt32(-1) // replica ID is always -1 for clients
  18. err := pe.PutArrayLength(len(r.blocks))
  19. if err != nil {
  20. return err
  21. }
  22. for topic, partitions := range r.blocks {
  23. err = pe.PutString(topic)
  24. if err != nil {
  25. return err
  26. }
  27. err = pe.PutArrayLength(len(partitions))
  28. if err != nil {
  29. return err
  30. }
  31. for partition, block := range partitions {
  32. pe.PutInt32(partition)
  33. err = block.Encode(pe)
  34. if err != nil {
  35. return err
  36. }
  37. }
  38. }
  39. return nil
  40. }
  41. func (r *OffsetRequest) key() int16 {
  42. return 2
  43. }
  44. func (r *OffsetRequest) version() int16 {
  45. return 0
  46. }
  47. func (r *OffsetRequest) AddBlock(topic string, partition_id int32, time types.OffsetTime, maxOffsets int32) {
  48. if r.blocks == nil {
  49. r.blocks = make(map[string]map[int32]*offsetRequestBlock)
  50. }
  51. if r.blocks[topic] == nil {
  52. r.blocks[topic] = make(map[int32]*offsetRequestBlock)
  53. }
  54. tmp := new(offsetRequestBlock)
  55. tmp.time = time
  56. tmp.maxOffsets = maxOffsets
  57. r.blocks[topic][partition_id] = tmp
  58. }