offset_fetch_request.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. package sarama
  2. type OffsetFetchRequest struct {
  3. Version int16
  4. ConsumerGroup string
  5. partitions map[string][]int32
  6. }
  7. func (r *OffsetFetchRequest) encode(pe packetEncoder) (err error) {
  8. if r.Version < 0 || r.Version > 5 {
  9. return PacketEncodingError{"invalid or unsupported OffsetFetchRequest version field"}
  10. }
  11. if err = pe.putString(r.ConsumerGroup); err != nil {
  12. return err
  13. }
  14. if r.Version >= 2 && r.partitions == nil {
  15. pe.putInt32(-1)
  16. } else {
  17. if err = pe.putArrayLength(len(r.partitions)); err != nil {
  18. return err
  19. }
  20. for topic, partitions := range r.partitions {
  21. if err = pe.putString(topic); err != nil {
  22. return err
  23. }
  24. if err = pe.putInt32Array(partitions); err != nil {
  25. return err
  26. }
  27. }
  28. }
  29. return nil
  30. }
  31. func (r *OffsetFetchRequest) decode(pd packetDecoder, version int16) (err error) {
  32. r.Version = version
  33. if r.ConsumerGroup, err = pd.getString(); err != nil {
  34. return err
  35. }
  36. partitionCount, err := pd.getArrayLength()
  37. if err != nil {
  38. return err
  39. }
  40. if (partitionCount == 0 && version < 2) || partitionCount < 0 {
  41. return nil
  42. }
  43. r.partitions = make(map[string][]int32)
  44. for i := 0; i < partitionCount; i++ {
  45. topic, err := pd.getString()
  46. if err != nil {
  47. return err
  48. }
  49. partitions, err := pd.getInt32Array()
  50. if err != nil {
  51. return err
  52. }
  53. r.partitions[topic] = partitions
  54. }
  55. return nil
  56. }
  57. func (r *OffsetFetchRequest) key() int16 {
  58. return 9
  59. }
  60. func (r *OffsetFetchRequest) version() int16 {
  61. return r.Version
  62. }
  63. func (r *OffsetFetchRequest) headerVersion() int16 {
  64. return 1
  65. }
  66. func (r *OffsetFetchRequest) requiredVersion() KafkaVersion {
  67. switch r.Version {
  68. case 1:
  69. return V0_8_2_0
  70. case 2:
  71. return V0_10_2_0
  72. case 3:
  73. return V0_11_0_0
  74. case 4:
  75. return V2_0_0_0
  76. case 5:
  77. return V2_1_0_0
  78. default:
  79. return MinVersion
  80. }
  81. }
  82. func (r *OffsetFetchRequest) ZeroPartitions() {
  83. if r.partitions == nil && r.Version >= 2 {
  84. r.partitions = make(map[string][]int32)
  85. }
  86. }
  87. func (r *OffsetFetchRequest) AddPartition(topic string, partitionID int32) {
  88. if r.partitions == nil {
  89. r.partitions = make(map[string][]int32)
  90. }
  91. r.partitions[topic] = append(r.partitions[topic], partitionID)
  92. }