offset_fetch_response.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. package sarama
  2. type OffsetFetchResponseBlock struct {
  3. Offset int64
  4. LeaderEpoch int32
  5. Metadata string
  6. Err KError
  7. }
  8. func (b *OffsetFetchResponseBlock) decode(pd packetDecoder, version int16) (err error) {
  9. b.Offset, err = pd.getInt64()
  10. if err != nil {
  11. return err
  12. }
  13. if version >= 5 {
  14. b.LeaderEpoch, err = pd.getInt32()
  15. if err != nil {
  16. return err
  17. }
  18. }
  19. b.Metadata, err = pd.getString()
  20. if err != nil {
  21. return err
  22. }
  23. tmp, err := pd.getInt16()
  24. if err != nil {
  25. return err
  26. }
  27. b.Err = KError(tmp)
  28. return nil
  29. }
  30. func (b *OffsetFetchResponseBlock) encode(pe packetEncoder, version int16) (err error) {
  31. pe.putInt64(b.Offset)
  32. if version >= 5 {
  33. pe.putInt32(b.LeaderEpoch)
  34. }
  35. err = pe.putString(b.Metadata)
  36. if err != nil {
  37. return err
  38. }
  39. pe.putInt16(int16(b.Err))
  40. return nil
  41. }
  42. type OffsetFetchResponse struct {
  43. Version int16
  44. ThrottleTimeMs int32
  45. Blocks map[string]map[int32]*OffsetFetchResponseBlock
  46. Err KError
  47. }
  48. func (r *OffsetFetchResponse) encode(pe packetEncoder) error {
  49. if r.Version >= 3 {
  50. pe.putInt32(r.ThrottleTimeMs)
  51. }
  52. if err := pe.putArrayLength(len(r.Blocks)); err != nil {
  53. return err
  54. }
  55. for topic, partitions := range r.Blocks {
  56. if err := pe.putString(topic); err != nil {
  57. return err
  58. }
  59. if err := pe.putArrayLength(len(partitions)); err != nil {
  60. return err
  61. }
  62. for partition, block := range partitions {
  63. pe.putInt32(partition)
  64. if err := block.encode(pe, r.Version); err != nil {
  65. return err
  66. }
  67. }
  68. }
  69. if r.Version >= 2 {
  70. pe.putInt16(int16(r.Err))
  71. }
  72. return nil
  73. }
  74. func (r *OffsetFetchResponse) decode(pd packetDecoder, version int16) (err error) {
  75. r.Version = version
  76. if version >= 3 {
  77. r.ThrottleTimeMs, err = pd.getInt32()
  78. if err != nil {
  79. return err
  80. }
  81. }
  82. numTopics, err := pd.getArrayLength()
  83. if err != nil {
  84. return err
  85. }
  86. if numTopics > 0 {
  87. r.Blocks = make(map[string]map[int32]*OffsetFetchResponseBlock, numTopics)
  88. for i := 0; i < numTopics; i++ {
  89. name, err := pd.getString()
  90. if err != nil {
  91. return err
  92. }
  93. numBlocks, err := pd.getArrayLength()
  94. if err != nil {
  95. return err
  96. }
  97. if numBlocks == 0 {
  98. r.Blocks[name] = nil
  99. continue
  100. }
  101. r.Blocks[name] = make(map[int32]*OffsetFetchResponseBlock, numBlocks)
  102. for j := 0; j < numBlocks; j++ {
  103. id, err := pd.getInt32()
  104. if err != nil {
  105. return err
  106. }
  107. block := new(OffsetFetchResponseBlock)
  108. err = block.decode(pd, version)
  109. if err != nil {
  110. return err
  111. }
  112. r.Blocks[name][id] = block
  113. }
  114. }
  115. }
  116. if version >= 2 {
  117. kerr, err := pd.getInt16()
  118. if err != nil {
  119. return err
  120. }
  121. r.Err = KError(kerr)
  122. }
  123. return nil
  124. }
  125. func (r *OffsetFetchResponse) key() int16 {
  126. return 9
  127. }
  128. func (r *OffsetFetchResponse) version() int16 {
  129. return r.Version
  130. }
  131. func (r *OffsetFetchResponse) headerVersion() int16 {
  132. return 0
  133. }
  134. func (r *OffsetFetchResponse) requiredVersion() KafkaVersion {
  135. switch r.Version {
  136. case 1:
  137. return V0_8_2_0
  138. case 2:
  139. return V0_10_2_0
  140. case 3:
  141. return V0_11_0_0
  142. case 4:
  143. return V2_0_0_0
  144. case 5:
  145. return V2_1_0_0
  146. default:
  147. return MinVersion
  148. }
  149. }
  150. func (r *OffsetFetchResponse) GetBlock(topic string, partition int32) *OffsetFetchResponseBlock {
  151. if r.Blocks == nil {
  152. return nil
  153. }
  154. if r.Blocks[topic] == nil {
  155. return nil
  156. }
  157. return r.Blocks[topic][partition]
  158. }
  159. func (r *OffsetFetchResponse) AddBlock(topic string, partition int32, block *OffsetFetchResponseBlock) {
  160. if r.Blocks == nil {
  161. r.Blocks = make(map[string]map[int32]*OffsetFetchResponseBlock)
  162. }
  163. partitions := r.Blocks[topic]
  164. if partitions == nil {
  165. partitions = make(map[int32]*OffsetFetchResponseBlock)
  166. r.Blocks[topic] = partitions
  167. }
  168. partitions[partition] = block
  169. }