offset_request.go 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. package protocol
  2. import enc "sarama/encoding"
  3. import "sarama/types"
  4. // OffsetTime is used in Offset Requests to ask for all messages before a certain time. Any positive int64
  5. // value will be interpreted as milliseconds, or use the special constants defined here.
  6. type OffsetTime int64
  7. const (
  8. // Ask for the latest offsets.
  9. LATEST_OFFSETS OffsetTime = -1
  10. // Ask for the earliest available offset. Note that because offsets are pulled in descending order,
  11. // asking for the earliest offset will always return you a single element.
  12. EARLIEST_OFFSET OffsetTime = -2
  13. )
  14. type offsetRequestBlock struct {
  15. time types.OffsetTime
  16. maxOffsets int32
  17. }
  18. func (r *offsetRequestBlock) Encode(pe enc.PacketEncoder) error {
  19. pe.PutInt64(int64(r.time))
  20. pe.PutInt32(r.maxOffsets)
  21. return nil
  22. }
  23. type OffsetRequest struct {
  24. blocks map[string]map[int32]*offsetRequestBlock
  25. }
  26. func (r *OffsetRequest) Encode(pe enc.PacketEncoder) error {
  27. pe.PutInt32(-1) // replica ID is always -1 for clients
  28. err := pe.PutArrayLength(len(r.blocks))
  29. if err != nil {
  30. return err
  31. }
  32. for topic, partitions := range r.blocks {
  33. err = pe.PutString(topic)
  34. if err != nil {
  35. return err
  36. }
  37. err = pe.PutArrayLength(len(partitions))
  38. if err != nil {
  39. return err
  40. }
  41. for partition, block := range partitions {
  42. pe.PutInt32(partition)
  43. err = block.Encode(pe)
  44. if err != nil {
  45. return err
  46. }
  47. }
  48. }
  49. return nil
  50. }
  51. func (r *OffsetRequest) key() int16 {
  52. return 2
  53. }
  54. func (r *OffsetRequest) version() int16 {
  55. return 0
  56. }
  57. func (r *OffsetRequest) AddBlock(topic string, partition_id int32, time types.OffsetTime, maxOffsets int32) {
  58. if r.blocks == nil {
  59. r.blocks = make(map[string]map[int32]*offsetRequestBlock)
  60. }
  61. if r.blocks[topic] == nil {
  62. r.blocks[topic] = make(map[int32]*offsetRequestBlock)
  63. }
  64. tmp := new(offsetRequestBlock)
  65. tmp.time = time
  66. tmp.maxOffsets = maxOffsets
  67. r.blocks[topic][partition_id] = tmp
  68. }