metadata_response.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322
  1. package sarama
  2. type PartitionMetadata struct {
  3. Err KError
  4. ID int32
  5. Leader int32
  6. Replicas []int32
  7. Isr []int32
  8. OfflineReplicas []int32
  9. }
  10. func (pm *PartitionMetadata) decode(pd packetDecoder, version int16) (err error) {
  11. tmp, err := pd.getInt16()
  12. if err != nil {
  13. return err
  14. }
  15. pm.Err = KError(tmp)
  16. pm.ID, err = pd.getInt32()
  17. if err != nil {
  18. return err
  19. }
  20. pm.Leader, err = pd.getInt32()
  21. if err != nil {
  22. return err
  23. }
  24. pm.Replicas, err = pd.getInt32Array()
  25. if err != nil {
  26. return err
  27. }
  28. pm.Isr, err = pd.getInt32Array()
  29. if err != nil {
  30. return err
  31. }
  32. if version >= 5 {
  33. pm.OfflineReplicas, err = pd.getInt32Array()
  34. if err != nil {
  35. return err
  36. }
  37. }
  38. return nil
  39. }
  40. func (pm *PartitionMetadata) encode(pe packetEncoder, version int16) (err error) {
  41. pe.putInt16(int16(pm.Err))
  42. pe.putInt32(pm.ID)
  43. pe.putInt32(pm.Leader)
  44. err = pe.putInt32Array(pm.Replicas)
  45. if err != nil {
  46. return err
  47. }
  48. err = pe.putInt32Array(pm.Isr)
  49. if err != nil {
  50. return err
  51. }
  52. if version >= 5 {
  53. err = pe.putInt32Array(pm.OfflineReplicas)
  54. if err != nil {
  55. return err
  56. }
  57. }
  58. return nil
  59. }
  60. type TopicMetadata struct {
  61. Err KError
  62. Name string
  63. IsInternal bool // Only valid for Version >= 1
  64. Partitions []*PartitionMetadata
  65. }
  66. func (tm *TopicMetadata) decode(pd packetDecoder, version int16) (err error) {
  67. tmp, err := pd.getInt16()
  68. if err != nil {
  69. return err
  70. }
  71. tm.Err = KError(tmp)
  72. tm.Name, err = pd.getString()
  73. if err != nil {
  74. return err
  75. }
  76. if version >= 1 {
  77. tm.IsInternal, err = pd.getBool()
  78. if err != nil {
  79. return err
  80. }
  81. }
  82. n, err := pd.getArrayLength()
  83. if err != nil {
  84. return err
  85. }
  86. tm.Partitions = make([]*PartitionMetadata, n)
  87. for i := 0; i < n; i++ {
  88. tm.Partitions[i] = new(PartitionMetadata)
  89. err = tm.Partitions[i].decode(pd, version)
  90. if err != nil {
  91. return err
  92. }
  93. }
  94. return nil
  95. }
  96. func (tm *TopicMetadata) encode(pe packetEncoder, version int16) (err error) {
  97. pe.putInt16(int16(tm.Err))
  98. err = pe.putString(tm.Name)
  99. if err != nil {
  100. return err
  101. }
  102. if version >= 1 {
  103. pe.putBool(tm.IsInternal)
  104. }
  105. err = pe.putArrayLength(len(tm.Partitions))
  106. if err != nil {
  107. return err
  108. }
  109. for _, pm := range tm.Partitions {
  110. err = pm.encode(pe, version)
  111. if err != nil {
  112. return err
  113. }
  114. }
  115. return nil
  116. }
  117. type MetadataResponse struct {
  118. Version int16
  119. ThrottleTimeMs int32
  120. Brokers []*Broker
  121. ClusterID *string
  122. ControllerID int32
  123. Topics []*TopicMetadata
  124. }
  125. func (r *MetadataResponse) decode(pd packetDecoder, version int16) (err error) {
  126. r.Version = version
  127. if version >= 3 {
  128. r.ThrottleTimeMs, err = pd.getInt32()
  129. if err != nil {
  130. return err
  131. }
  132. }
  133. n, err := pd.getArrayLength()
  134. if err != nil {
  135. return err
  136. }
  137. r.Brokers = make([]*Broker, n)
  138. for i := 0; i < n; i++ {
  139. r.Brokers[i] = new(Broker)
  140. err = r.Brokers[i].decode(pd, version)
  141. if err != nil {
  142. return err
  143. }
  144. }
  145. if version >= 2 {
  146. r.ClusterID, err = pd.getNullableString()
  147. if err != nil {
  148. return err
  149. }
  150. }
  151. if version >= 1 {
  152. r.ControllerID, err = pd.getInt32()
  153. if err != nil {
  154. return err
  155. }
  156. } else {
  157. r.ControllerID = -1
  158. }
  159. n, err = pd.getArrayLength()
  160. if err != nil {
  161. return err
  162. }
  163. r.Topics = make([]*TopicMetadata, n)
  164. for i := 0; i < n; i++ {
  165. r.Topics[i] = new(TopicMetadata)
  166. err = r.Topics[i].decode(pd, version)
  167. if err != nil {
  168. return err
  169. }
  170. }
  171. return nil
  172. }
  173. func (r *MetadataResponse) encode(pe packetEncoder) error {
  174. if r.Version >= 3 {
  175. pe.putInt32(r.ThrottleTimeMs)
  176. }
  177. err := pe.putArrayLength(len(r.Brokers))
  178. if err != nil {
  179. return err
  180. }
  181. for _, broker := range r.Brokers {
  182. err = broker.encode(pe, r.Version)
  183. if err != nil {
  184. return err
  185. }
  186. }
  187. if r.Version >= 2 {
  188. err := pe.putNullableString(r.ClusterID)
  189. if err != nil {
  190. return err
  191. }
  192. }
  193. if r.Version >= 1 {
  194. pe.putInt32(r.ControllerID)
  195. }
  196. err = pe.putArrayLength(len(r.Topics))
  197. if err != nil {
  198. return err
  199. }
  200. for _, tm := range r.Topics {
  201. err = tm.encode(pe, r.Version)
  202. if err != nil {
  203. return err
  204. }
  205. }
  206. return nil
  207. }
  208. func (r *MetadataResponse) key() int16 {
  209. return 3
  210. }
  211. func (r *MetadataResponse) version() int16 {
  212. return r.Version
  213. }
  214. func (r *MetadataResponse) requiredVersion() KafkaVersion {
  215. switch r.Version {
  216. case 1:
  217. return V0_10_0_0
  218. case 2:
  219. return V0_10_1_0
  220. case 3, 4:
  221. return V0_11_0_0
  222. case 5:
  223. return V1_0_0_0
  224. default:
  225. return MinVersion
  226. }
  227. }
  228. // testing API
  229. func (r *MetadataResponse) AddBroker(addr string, id int32) {
  230. r.Brokers = append(r.Brokers, &Broker{id: id, addr: addr})
  231. }
  232. func (r *MetadataResponse) AddTopic(topic string, err KError) *TopicMetadata {
  233. var tmatch *TopicMetadata
  234. for _, tm := range r.Topics {
  235. if tm.Name == topic {
  236. tmatch = tm
  237. goto foundTopic
  238. }
  239. }
  240. tmatch = new(TopicMetadata)
  241. tmatch.Name = topic
  242. r.Topics = append(r.Topics, tmatch)
  243. foundTopic:
  244. tmatch.Err = err
  245. return tmatch
  246. }
  247. func (r *MetadataResponse) AddTopicPartition(topic string, partition, brokerID int32, replicas, isr []int32, offline []int32, err KError) {
  248. tmatch := r.AddTopic(topic, ErrNoError)
  249. var pmatch *PartitionMetadata
  250. for _, pm := range tmatch.Partitions {
  251. if pm.ID == partition {
  252. pmatch = pm
  253. goto foundPartition
  254. }
  255. }
  256. pmatch = new(PartitionMetadata)
  257. pmatch.ID = partition
  258. tmatch.Partitions = append(tmatch.Partitions, pmatch)
  259. foundPartition:
  260. pmatch.Leader = brokerID
  261. pmatch.Replicas = replicas
  262. pmatch.Isr = isr
  263. pmatch.OfflineReplicas = offline
  264. pmatch.Err = err
  265. }