offset_request.go 1.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. package protocol
  2. import enc "sarama/encoding"
  3. type offsetRequestBlock struct {
  4. time int64
  5. maxOffsets int32
  6. }
  7. func (r *offsetRequestBlock) Encode(pe enc.PacketEncoder) error {
  8. pe.putInt64(r.time)
  9. pe.putInt32(r.maxOffsets)
  10. return nil
  11. }
  12. type OffsetRequest struct {
  13. blocks map[string]map[int32]*offsetRequestBlock
  14. }
  15. func (r *OffsetRequest) Encode(pe enc.PacketEncoder) error {
  16. pe.PutInt32(-1) // replica ID is always -1 for clients
  17. err := pe.PutArrayLength(len(r.blocks))
  18. if err != nil {
  19. return err
  20. }
  21. for topic, partitions := range r.blocks {
  22. err = pe.PutString(topic)
  23. if err != nil {
  24. return err
  25. }
  26. err = pe.PutArrayLength(len(partitions))
  27. if err != nil {
  28. return err
  29. }
  30. for partition, block := range partitions {
  31. pe.PutInt32(partition)
  32. err = block.Encode(pe)
  33. if err != nil {
  34. return err
  35. }
  36. }
  37. }
  38. }
  39. func (r *OffsetRequest) key() int16 {
  40. return 2
  41. }
  42. func (r *OffsetRequest) version() int16 {
  43. return 0
  44. }
  45. func (r *OffsetRequest) AddBlock(topic string, partition_id int32, time int64, maxOffsets int32) {
  46. if r.blocks == nil {
  47. r.blocks = make(map[string]map[int32]*offsetRequestBlock)
  48. }
  49. if r.blocks[topic] == nil {
  50. r.blocks[topic] = make(map[int32]*offsetRequestBlock)
  51. }
  52. tmp := new(offsetRequestBlock)
  53. tmp.time = time
  54. tmp.maxOffsets = maxOffsets
  55. r.blocks[topic][partition_id] = tmp
  56. }