metadata_response.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310
  1. package sarama
  2. type PartitionMetadata struct {
  3. Err KError
  4. ID int32
  5. Leader int32
  6. Replicas []int32
  7. Isr []int32
  8. OfflineReplicas []int32
  9. }
  10. func (pm *PartitionMetadata) decode(pd packetDecoder, version int16) (err error) {
  11. tmp, err := pd.getInt16()
  12. if err != nil {
  13. return err
  14. }
  15. pm.Err = KError(tmp)
  16. pm.ID, err = pd.getInt32()
  17. if err != nil {
  18. return err
  19. }
  20. pm.Leader, err = pd.getInt32()
  21. if err != nil {
  22. return err
  23. }
  24. pm.Replicas, err = pd.getInt32Array()
  25. if err != nil {
  26. return err
  27. }
  28. pm.Isr, err = pd.getInt32Array()
  29. if err != nil {
  30. return err
  31. }
  32. if version >= 5 {
  33. pm.OfflineReplicas, err = pd.getInt32Array()
  34. if err != nil {
  35. return err
  36. }
  37. }
  38. return nil
  39. }
  40. func (pm *PartitionMetadata) encode(pe packetEncoder, version int16) (err error) {
  41. pe.putInt16(int16(pm.Err))
  42. pe.putInt32(pm.ID)
  43. pe.putInt32(pm.Leader)
  44. err = pe.putInt32Array(pm.Replicas)
  45. if err != nil {
  46. return err
  47. }
  48. err = pe.putInt32Array(pm.Isr)
  49. if err != nil {
  50. return err
  51. }
  52. if version >= 5 {
  53. err = pe.putInt32Array(pm.OfflineReplicas)
  54. if err != nil {
  55. return err
  56. }
  57. }
  58. return nil
  59. }
  60. type TopicMetadata struct {
  61. Err KError
  62. Name string
  63. IsInternal bool // Only valid for Version >= 1
  64. Partitions []*PartitionMetadata
  65. }
  66. func (tm *TopicMetadata) decode(pd packetDecoder, version int16) (err error) {
  67. tmp, err := pd.getInt16()
  68. if err != nil {
  69. return err
  70. }
  71. tm.Err = KError(tmp)
  72. tm.Name, err = pd.getString()
  73. if err != nil {
  74. return err
  75. }
  76. if version >= 1 {
  77. tm.IsInternal, err = pd.getBool()
  78. if err != nil {
  79. return err
  80. }
  81. }
  82. n, err := pd.getArrayLength()
  83. if err != nil {
  84. return err
  85. }
  86. tm.Partitions = make([]*PartitionMetadata, n)
  87. for i := 0; i < n; i++ {
  88. tm.Partitions[i] = new(PartitionMetadata)
  89. err = tm.Partitions[i].decode(pd, version)
  90. if err != nil {
  91. return err
  92. }
  93. }
  94. return nil
  95. }
  96. func (tm *TopicMetadata) encode(pe packetEncoder, version int16) (err error) {
  97. pe.putInt16(int16(tm.Err))
  98. err = pe.putString(tm.Name)
  99. if err != nil {
  100. return err
  101. }
  102. if version >= 1 {
  103. pe.putBool(tm.IsInternal)
  104. }
  105. err = pe.putArrayLength(len(tm.Partitions))
  106. if err != nil {
  107. return err
  108. }
  109. for _, pm := range tm.Partitions {
  110. err = pm.encode(pe, version)
  111. if err != nil {
  112. return err
  113. }
  114. }
  115. return nil
  116. }
  117. type MetadataResponse struct {
  118. Version int16
  119. ThrottleTimeMs int32
  120. Brokers []*Broker
  121. ClusterID *string
  122. ControllerID int32
  123. Topics []*TopicMetadata
  124. }
  125. func (r *MetadataResponse) decode(pd packetDecoder, version int16) (err error) {
  126. r.Version = version
  127. if version >= 3 {
  128. r.ThrottleTimeMs, err = pd.getInt32()
  129. if err != nil {
  130. return err
  131. }
  132. }
  133. n, err := pd.getArrayLength()
  134. if err != nil {
  135. return err
  136. }
  137. r.Brokers = make([]*Broker, n)
  138. for i := 0; i < n; i++ {
  139. r.Brokers[i] = new(Broker)
  140. err = r.Brokers[i].decode(pd, version)
  141. if err != nil {
  142. return err
  143. }
  144. }
  145. if version >= 2 {
  146. r.ClusterID, err = pd.getNullableString()
  147. if err != nil {
  148. return err
  149. }
  150. }
  151. if version >= 1 {
  152. r.ControllerID, err = pd.getInt32()
  153. if err != nil {
  154. return err
  155. }
  156. } else {
  157. r.ControllerID = -1
  158. }
  159. n, err = pd.getArrayLength()
  160. if err != nil {
  161. return err
  162. }
  163. r.Topics = make([]*TopicMetadata, n)
  164. for i := 0; i < n; i++ {
  165. r.Topics[i] = new(TopicMetadata)
  166. err = r.Topics[i].decode(pd, version)
  167. if err != nil {
  168. return err
  169. }
  170. }
  171. return nil
  172. }
  173. func (r *MetadataResponse) encode(pe packetEncoder) error {
  174. err := pe.putArrayLength(len(r.Brokers))
  175. if err != nil {
  176. return err
  177. }
  178. for _, broker := range r.Brokers {
  179. err = broker.encode(pe, r.Version)
  180. if err != nil {
  181. return err
  182. }
  183. }
  184. if r.Version >= 1 {
  185. pe.putInt32(r.ControllerID)
  186. }
  187. err = pe.putArrayLength(len(r.Topics))
  188. if err != nil {
  189. return err
  190. }
  191. for _, tm := range r.Topics {
  192. err = tm.encode(pe, r.Version)
  193. if err != nil {
  194. return err
  195. }
  196. }
  197. return nil
  198. }
  199. func (r *MetadataResponse) key() int16 {
  200. return 3
  201. }
  202. func (r *MetadataResponse) version() int16 {
  203. return r.Version
  204. }
  205. func (r *MetadataResponse) requiredVersion() KafkaVersion {
  206. switch r.Version {
  207. case 1:
  208. return V0_10_0_0
  209. case 2:
  210. return V0_10_1_0
  211. case 3, 4:
  212. return V0_11_0_0
  213. case 5:
  214. return V1_0_0_0
  215. default:
  216. return MinVersion
  217. }
  218. }
  219. // testing API
  220. func (r *MetadataResponse) AddBroker(addr string, id int32) {
  221. r.Brokers = append(r.Brokers, &Broker{id: id, addr: addr})
  222. }
  223. func (r *MetadataResponse) AddTopic(topic string, err KError) *TopicMetadata {
  224. var tmatch *TopicMetadata
  225. for _, tm := range r.Topics {
  226. if tm.Name == topic {
  227. tmatch = tm
  228. goto foundTopic
  229. }
  230. }
  231. tmatch = new(TopicMetadata)
  232. tmatch.Name = topic
  233. r.Topics = append(r.Topics, tmatch)
  234. foundTopic:
  235. tmatch.Err = err
  236. return tmatch
  237. }
  238. func (r *MetadataResponse) AddTopicPartition(topic string, partition, brokerID int32, replicas, isr []int32, err KError) {
  239. tmatch := r.AddTopic(topic, ErrNoError)
  240. var pmatch *PartitionMetadata
  241. for _, pm := range tmatch.Partitions {
  242. if pm.ID == partition {
  243. pmatch = pm
  244. goto foundPartition
  245. }
  246. }
  247. pmatch = new(PartitionMetadata)
  248. pmatch.ID = partition
  249. tmatch.Partitions = append(tmatch.Partitions, pmatch)
  250. foundPartition:
  251. pmatch.Leader = brokerID
  252. pmatch.Replicas = replicas
  253. pmatch.Isr = isr
  254. pmatch.Err = err
  255. }