offset_request.go 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. package sarama
  2. // OffsetTime is used in Offset Requests to ask for all messages before a certain time. Any positive int64
  3. // value will be interpreted as milliseconds, or use the special constants defined here.
  4. type OffsetTime int64
  5. const (
  6. // LatestOffsets askes for the latest offsets.
  7. LatestOffsets OffsetTime = -1
  8. // EarliestOffset askes for the earliest available offset. Note that because offsets are pulled in descending order,
  9. // asking for the earliest offset will always return you a single element.
  10. EarliestOffset OffsetTime = -2
  11. )
  12. type offsetRequestBlock struct {
  13. time OffsetTime
  14. maxOffsets int32
  15. }
  16. func (r *offsetRequestBlock) encode(pe packetEncoder) error {
  17. pe.putInt64(int64(r.time))
  18. pe.putInt32(r.maxOffsets)
  19. return nil
  20. }
  21. type OffsetRequest struct {
  22. blocks map[string]map[int32]*offsetRequestBlock
  23. }
  24. func (r *OffsetRequest) encode(pe packetEncoder) error {
  25. pe.putInt32(-1) // replica ID is always -1 for clients
  26. err := pe.putArrayLength(len(r.blocks))
  27. if err != nil {
  28. return err
  29. }
  30. for topic, partitions := range r.blocks {
  31. err = pe.putString(topic)
  32. if err != nil {
  33. return err
  34. }
  35. err = pe.putArrayLength(len(partitions))
  36. if err != nil {
  37. return err
  38. }
  39. for partition, block := range partitions {
  40. pe.putInt32(partition)
  41. err = block.encode(pe)
  42. if err != nil {
  43. return err
  44. }
  45. }
  46. }
  47. return nil
  48. }
  49. func (r *OffsetRequest) key() int16 {
  50. return 2
  51. }
  52. func (r *OffsetRequest) version() int16 {
  53. return 0
  54. }
  55. func (r *OffsetRequest) AddBlock(topic string, partitionID int32, time OffsetTime, maxOffsets int32) {
  56. if r.blocks == nil {
  57. r.blocks = make(map[string]map[int32]*offsetRequestBlock)
  58. }
  59. if r.blocks[topic] == nil {
  60. r.blocks[topic] = make(map[int32]*offsetRequestBlock)
  61. }
  62. tmp := new(offsetRequestBlock)
  63. tmp.time = time
  64. tmp.maxOffsets = maxOffsets
  65. r.blocks[topic][partitionID] = tmp
  66. }