offset_request.go 1.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. package protocol
  2. import enc "sarama/encoding"
  3. type offsetRequestBlock struct {
  4. time int64
  5. maxOffsets int32
  6. }
  7. func (r *offsetRequestBlock) Encode(pe enc.PacketEncoder) error {
  8. pe.PutInt64(r.time)
  9. pe.PutInt32(r.maxOffsets)
  10. return nil
  11. }
  12. type OffsetRequest struct {
  13. blocks map[string]map[int32]*offsetRequestBlock
  14. }
  15. func (r *OffsetRequest) Encode(pe enc.PacketEncoder) error {
  16. pe.PutInt32(-1) // replica ID is always -1 for clients
  17. err := pe.PutArrayLength(len(r.blocks))
  18. if err != nil {
  19. return err
  20. }
  21. for topic, partitions := range r.blocks {
  22. err = pe.PutString(topic)
  23. if err != nil {
  24. return err
  25. }
  26. err = pe.PutArrayLength(len(partitions))
  27. if err != nil {
  28. return err
  29. }
  30. for partition, block := range partitions {
  31. pe.PutInt32(partition)
  32. err = block.Encode(pe)
  33. if err != nil {
  34. return err
  35. }
  36. }
  37. }
  38. return nil
  39. }
  40. func (r *OffsetRequest) key() int16 {
  41. return 2
  42. }
  43. func (r *OffsetRequest) version() int16 {
  44. return 0
  45. }
  46. func (r *OffsetRequest) AddBlock(topic string, partition_id int32, time int64, maxOffsets int32) {
  47. if r.blocks == nil {
  48. r.blocks = make(map[string]map[int32]*offsetRequestBlock)
  49. }
  50. if r.blocks[topic] == nil {
  51. r.blocks[topic] = make(map[int32]*offsetRequestBlock)
  52. }
  53. tmp := new(offsetRequestBlock)
  54. tmp.time = time
  55. tmp.maxOffsets = maxOffsets
  56. r.blocks[topic][partition_id] = tmp
  57. }